package com.flink.carmonitor;


import com.flink.dto.CardSpeedDTO;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer;
import scala.Tuple2;

import java.util.Properties;

/**
 * 统计每隔1分钟统计5分钟之内每区域每个卡口的平均速度
 */
public class MonitorAvgSpeedAnaly {

    public static void main(String[] args) {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        //从kafka中读取数据
        Properties properties = new Properties();
        properties.setProperty("group.id", "test222");
        properties.setProperty("bootstrap.servers", "172.16.12.148:9092,172.16.12.149:9092");
        properties.setProperty("auto.offset.reset", "latest");
        properties.setProperty("key-serializer", "org.apache.kafka.common.serialization.Deserializer");
        properties.setProperty("value-serializer", "org.apache.kafka.common.serialization.Deserializer");
        DataStream<String> ds1 = env.addSource(new FlinkKafkaConsumer<String>("monitor_speed", new SimpleStringSchema(), properties));
      //  DataStreamSource<String> ds1 = env.socketTextStream("172.16.10.45", 7777);
        DataStream<CardSpeedDTO> monitoryDs = ds1.map(new MapFunction<String, CardSpeedDTO>() {
            @Override
            public CardSpeedDTO map(String value) throws Exception {
                String[] split = value.split("#");
                CardSpeedDTO dto = new CardSpeedDTO();
                dto.setCardNo(split[0].trim());
                dto.setAreaId(split[1].trim());
                dto.setRoadId(split[2].trim());
                dto.setMonitorId(split[3].trim());
                dto.setCameraId(split[4].trim());
                dto.setActionTime(Long.valueOf(split[5].trim()));
                dto.setSpeed(Double.valueOf(split[6].trim()));
                return dto;
            }
        }, TypeInformation.of(CardSpeedDTO.class));

        //设置窗口 计算每隔1分钟  统计5分钟之内每个卡口的平均速度
        monitoryDs.keyBy(g -> g.getAreaId() + "_" + g.getRoadId() + "_" + g.getMonitorId())
                .window(SlidingEventTimeWindows.of(Time.minutes(5), Time.minutes(1))).aggregate(new AggregateFunction<CardSpeedDTO, Tuple2<Long, Double>, Tuple2<Long, Double>>() {
                    @Override
                    public Tuple2<Long, Double> createAccumulator() {
                        return new Tuple2<>(0L, 0.0);
                    }

                    @Override
                    public Tuple2<Long, Double> add(CardSpeedDTO value, Tuple2<Long, Double> accumulator) {
                        return new Tuple2<>(accumulator._1() + 1, accumulator._2() + value.getSpeed());
                    }

                    @Override
                    public Tuple2<Long, Double> getResult(Tuple2<Long, Double> accumulator) {

                        return accumulator;
                    }

                    //不通并行度之间结果集集合
                    @Override
                    public Tuple2<Long, Double> merge(Tuple2<Long, Double> a, Tuple2<Long, Double> b) {
                        return new Tuple2<>(a._1 + b._1, a._2 + b._2);
                    }
                });


    }

}
